<?php
/**
 * Created by PhpStorm.
 * User: longli
 * VX: isa1589518286
 * Date: 2019/07/18
 * Time: 9:58
 * @link http://www.lmterp.cn
 */

namespace app\common\library\kafka;

use app\common\library\Tools;
use RdKafka\Conf;
use RdKafka\Exception;
use RdKafka\Message;
use RdKafka\TopicConf;

/**
 * kafka 消费者客户端
 * Class KafkaConsumer
 * @package app\common\library\kafka
 */
class KafkaConsumer
{

    /**
     * 消费者对象
     * @var \RdKafka\KafkaConsumer
     */
    protected $consumer;

    /**
     * 当前消费的主题
     * @var string
     */
    protected $currentTopic = null;

    /**
     * 默认主题
     * @var string
     */
    protected $defaultTopic = 'default';

    /**
     * 消息对象
     * @var Message
     */
    protected $message;

    /**
     * 配置信息
     * @var array
     */
    protected $config = [];

    /**
     * 订阅列表
     * @var array
     */
    protected $subscribe = [];

    public function __construct(array $config = [])
    {
        $this->config = array_merge($this->config, $config);
        $this->init();
    }

    /**
     * 消费
     * @param string|array $topics 消费主题
     * @return array|null 获取消费信息
     * @date 2019/07/18
     * @author longli
     */
    public function pop($topics = "")
    {
        try
        {
            if(empty($topics) && empty($this->subscribe)) $topics = $this->getDefaultTopic();
            if(!is_array($topics)) $topics = [$topics];
            foreach($topics as $topic)
            {
                if(empty($topic)) continue;
                if(!in_array($topic, $this->subscribe))
                {
                    $this->subscribe[] = $topic;
                    $this->consumer->subscribe($this->subscribe);
                }
            }
            $this->message = $this->consumer->consume(3000);
            if($this->message === null)
                return null;
            $this->setCurrentTopic($this->message->topic_name);
            switch($this->message->err)
            {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    return $this->getPayload();
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    break;
                default:
                    break;
            }
        }catch(Exception $exception)
        {}
    }

    /**
     * 获取消费信息
     * @return array
     * @date 2019/07/18
     * @author longli
     */
    public function getPayload()
    {
        return Tools::isJson($this->message->payload, $json)
            ? $json
            : [];
    }

    /**
     * 获取键
     * @return string
     * @date 2019/07/18
     * @author longli
     */
    public function getKey()
    {
        return $this->message->key;
    }

    /**
     * 初始化消费者
     * @date 2019/07/18
     * @author longli
     */
    protected function init()
    {
        $topicConf = new TopicConf();
        $topicConf->set('auto.offset.reset', 'smallest');

        $conf = new Conf();
        $conf->set('group.id', $this->config['group_id']);
        $brokers = is_array($this->config['brokers'])
            ? join(',', $this->config['brokers'])
            : $this->config['brokers'];
        $conf->set('metadata.broker.list', $brokers);
        $conf->set('offset.store.method', 'broker');
        $conf->setDefaultTopicConf($topicConf);

        $this->consumer = new \RdKafka\KafkaConsumer($conf);
    }

    /**
     * 获取配置信息
     * @return array
     */
    public function getConfig()
    {
        return $this->config;
    }

    /**
     * 获取订阅主题
     * @return array
     */
    public function getSubscribe()
    {
        return $this->subscribe;
    }

    /**
     * 设置订阅主题
     * @param array $subscribe
     */
    public function setSubscribe(array $subscribe = [])
    {
        $this->subscribe = $subscribe;
        $this->consumer->subscribe($this->subscribe);
    }

    /**
     * @return string
     */
    public function getDefaultTopic()
    {
        return $this->defaultTopic;
    }

    /**
     * @param string $defaultTopic
     */
    public function setDefaultTopic($defaultTopic)
    {
        $this->defaultTopic = $defaultTopic;
    }

    /**
     * @return string
     */
    public function getCurrentTopic()
    {
        return $this->currentTopic;
    }

    /**
     * @param string $currentTopic
     */
    public function setCurrentTopic($currentTopic)
    {
        $this->currentTopic = $currentTopic;
    }


}